Netty(四):Netty模式

您所在的位置:网站首页 ios 泛型 Netty(四):Netty模式

Netty(四):Netty模式

2022-12-19 14:58| 来源: 网络整理| 查看: 265

文章目录​​案例​​​​异步模型​​​​Future 说明​​​​Future-Listener 机制​​​​HTTP服务​​

Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor

Netty(四):Netty模式_客户端

BossGroup 线程维护 Selector,只关注 Accecpt当接收到 Accept 事件,获取到对应的 SocketChannel,封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环),并进行维护当 Worker 线程监听到 Selector 中通道发生自己感兴趣的事件后,就进行处理(就由 handler),注意 handler 已经加入到通道

进阶版

Netty(四):Netty模式_bootstrap_02

详细版

Netty(四):Netty模式_客户端_03

1.Netty抽象出两组线程池,BossGroup 负责客户端连接,WorkerGroup 负责网络的读写

2.BossGroup 和 WorkerGroup 类型都是NIOEventLoopGroup

3.NIOEventLoop 相当于一个事件循环组,这个组包含多个事件循环,每一个循环都是NIOEventLoop

4.NIOEventLoop 表示一个不断循环的执行处理任务的线程.每个NIOEventLoop都有一个Selector,用于监听绑定在其上的socket网络通讯

5.NioEventLoopGroup 可以有多个线程,即可以包含多个NioEventLoop

6.每个BossNioEventLoop 循环执行步骤有3步

轮询accept事件

处理accept事件,与client建立连接,生成NioScoketChannel,并将其注册到某个worker,NIOEventLoop上的Selectot

处理任务队列的任务,即runAllTasks

7.每个worker NIOEventLoop循环执行的步骤

轮询read,write事件

处理IO事件,即read,write事件,在对应NIOSocketChannel处理

处理任务队列的任务,即runALlTasks

8.每个Worker,NIOEventLoop处理业务时,会使用pipeline,pipline中包含了channel,通过pipline可以获取到对应管道,管道中维护了很多的处理器

案例

Netty 服务器在 6668 端口监听,客户端能发送消息给服务器"hello,服务器~"

服务器可以回复消息给客户端"hello,客户端~"

NettyServer

package com.zyd.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer { public static void main(String[] args) { /* 1.创建BossGroup 和 WorkerGroup 2. BossGroup只处理连接请求真正和客户端业务处理,交给WorkerGroup完成 3.两个都是无限循环 4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 实际 cpu核数 *2 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程进行设置 bootstrap.group(bossGroup, workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel作为服务器通道实现 .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 .childHandler(new ChannelInitializer() { //创建一个通道初始化对象 protected void initChannel(SocketChannel socketChannel) throws Exception { //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue System.out.println("客户socketChannel hashCode = " + socketChannel.hashCode()); socketChannel.pipeline().addLast(new NettyServerHandler()); } });// 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println("服务器 is ready "); //绑定一个端口,并且同步,生成一个ChannelFuture //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); cf.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

NettyServerHandler

package com.zyd.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;/** * 自定义一个Handler,需要继续netty,规定好的某个HandlerAdapter * */public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 读取数据实际 /* 1.ChannelHandlerContext ctx:上下文对象,含有 管道pipline,通道channel,地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站 //将 msg 转成一个 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + channel.remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^^ω^^ω^^ω^^ω^^ω^


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3